package com.kool.kmqtt.server.processer;

import com.kool.kmqtt.server.PacketSender;
import com.kool.kmqtt.server.ServerConfig;
import com.kool.kmqtt.server.constant.PacketTypeEnum;
import com.kool.kmqtt.server.encoder.PubackPacketEncoder;
import com.kool.kmqtt.server.encoder.PubrecPacketEncoder;
import com.kool.kmqtt.server.exception.ErrorCode;
import com.kool.kmqtt.server.exception.ProtocolException;
import com.kool.kmqtt.server.packet.*;
import com.kool.kmqtt.server.parser.PacketParser;
import com.kool.kmqtt.service.KauthService;
import com.kool.kmqtt.service.request.TopicAuthReq;
import com.kool.kmqtt.service.vo.TopicAuthResp;
import com.kool.kmqtt.util.SpringUtil;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;

import java.util.regex.Pattern;

/**
 * PUBLISH报文处理器
 */
@Slf4j
public class PublishPacketProcessor extends PacketProcessor {
    public PublishPacketProcessor(ChannelHandlerContext ctx, PacketParser packetParser) {
        super(ctx, packetParser);
    }

    @Override
    protected void validate(Packet packet) {
        //对于QoS 0的消息，DUP标志必须设置为0
        int qos = packet.getFixedHeader().getQoS();
        boolean dup = packet.getFixedHeader().getDup();
        if (qos == 0 && dup) {
            throw new ProtocolException(ErrorCode.DUP_ERROR);
        }

        //QoS 必须等于 0/1/2
        if (qos > 2 || qos < 0) {
            throw new ProtocolException(ErrorCode.QOS_ERROR);
        }

        PublishVariableHeader variableHeader = (PublishVariableHeader) packet.getVariableHeader();
        String topicName = variableHeader.getTopicName();
        //PUBLISH报文中的主题名不能包含通配符
        if (Pattern.matches("[\\S\\s]*(\\+|#)[\\S\\s]*", topicName)) {
            throw new ProtocolException(ErrorCode.TOPIC_NAME_ERROR, "主题名不能包含通配符");
        }
    }

    @Override
    protected void processPacket(Packet packet) {

        Integer packetId = packet.getPacketId();
        String clientId = sessionContext.getClientId();
        int qos = packet.getFixedHeader().getQoS();
        boolean retain = packet.getFixedHeader().getRetain();
        String topicName = ((PublishVariableHeader) packet.getVariableHeader()).getTopicName();

        //验证主题权限
        if (ServerConfig.getInstance().getTopicAuthSwitch()) {
            String userName = sessionContext.getUserName();

            TopicAuthReq request = new TopicAuthReq();
            request.setUserName(userName);
            request.setTopicName(topicName);
            request.setAction("publish");

            KauthService kauthService = SpringUtil.getBean(KauthService.class);
            TopicAuthResp resp = kauthService.topicAuth(request);
            if (resp.getIsSuccess() != 1) {
                log.info("主题权限验证不通过，不发布：用户名={}，主题名={}", userName, topicName);
                //直接响应，丢弃PUBLISH报文
                if (qos == 1) {
                    sendPuback(packetId);
                } else if (qos == 2) {
                    sendPubrec(packetId);
                }
                return;
                //or 断开连接？
//                    throw new AppException(ErrorCode.NO_TOPIC_PERMIT);
            }
        }
        //保存入站QoS=2的入站未确认PUBLISH消息（收到PUBREL后删除）
        if (qos == 2) {
            repository.saveReceivePacket(clientId, packet);
        }

        /**
         * 1.如果服务端收到一条保留（RETAIN）标志为1的QoS 0消息，它必须丢弃之前为那个主题保留的任何消息,
         * 它应该将这个新的QoS 0消息当作那个主题的新保留消息
         *
         * 2.收到保留标志为1且有效载荷为零字节的PUBLISH报文，同一个主题下任何现存的保留消息必须被移除
         */
        if (retain && qos == 0) {
            repository.deleteRetainPackets(topicName);
            repository.saveRetainPacket(topicName, packet);
        } else if (retain && packet.getFixedHeader().getRemainingLength() - packet.getVariableHeaderLength() == 0) {
            repository.deleteRetainPackets(topicName);
        } else if (retain) {
            repository.saveRetainPacket(topicName, packet);
        }

        //入站QoS=0、1时发送PUBLISH
        if (qos == 0 || qos == 1) {
            //分发PUBLISH报文
            PublishUtil.sendPublishToSubscribers(packet);
        }

        //根据入站Qos响应PUBACK(QoS=1)、PUBREC(QoS=2)
        if (qos == 1) {
            sendPuback(packetId);
        } else if (qos == 2) {
            sendPubrec(packetId);
        }
    }

    private void sendPubrec(int packetId) {
        FixedHeader fixedHeader = new FixedHeader();
        fixedHeader.setPacketType(PacketTypeEnum.PUBREC.getCode());
        fixedHeader.setFlags(0);
        fixedHeader.setRemainingLength(2);

        PubrecVariableHeader variableHeader = new PubrecVariableHeader();
        variableHeader.setPacketId(packetId);

        Packet packet = new Packet();
        packet.setFixedHeader(fixedHeader);
        packet.setVariableHeader(variableHeader);

        PacketSender packetSender = new PacketSender(sessionContext, new PubrecPacketEncoder());
        packetSender.send(packet);

    }

    private void sendPuback(int packetId) {
        FixedHeader fixedHeader = new FixedHeader();
        fixedHeader.setPacketType(PacketTypeEnum.PUBACK.getCode());
        fixedHeader.setFlags(0);
        fixedHeader.setRemainingLength(2);

        PubackVariableHeader variableHeader = new PubackVariableHeader();
        variableHeader.setPacketId(packetId);

        Packet packet = new Packet();
        packet.setFixedHeader(fixedHeader);
        packet.setVariableHeader(variableHeader);

        PacketSender packetSender = new PacketSender(sessionContext, new PubackPacketEncoder());
        packetSender.send(packet);
    }
}
